Java NIO - Netty Decoder 与 Encoder 核心组件

在 Netty 中,数据的流动就像工厂的流水线,核心任务之一是实现二进制数据(ByteBuf)与业务对象(POJO)之间的互相转换:

  • 入站处理 (Inbound) 需要实现字节到对象的转变(拆包解密):底层 Java 通道读取到 ByteBuf 二进制数据,进入流水线。通过 Decoder(解码器),将晦涩难懂的二进制字节码“翻译”成程序可以直接处理的 POJO 业务对象,以实现让后续的业务逻辑处理器能够直接读取结构化数据。
  • 出站处理 (Outbound) 需要实现从对象到字节的转变(封包加密):业务处理完成后,产生一个 POJO 业务对象准备发送。通过 Encoder(编码器),将该对象 “打包” 还原成能够通过网络传输的 ByteBuf 二进制数据,最终交给底层 Java 通道,发送至远端设备。


解码 ByteToMessageDecoder

简介

Netty 内置了 ByteToMessageDecoder 解码器,它是一个抽象基类,实现了解码处理的基础逻辑和流程。Netty 中的解码器都是 Inbound 入站处理器类型,都直接或者间接地实现了入站处理的超级接口 ChannelInboundHandler。

ByteToMessageDecoder 解码的流程大致如下。

  • 首先,它将上一站传过来的输入到ByteBuf中的数据进行解码,解码出一个 List 对象列表;
  • 然后,迭代 List 列表,逐个将 POJO 对象传入下一站 Inbound 入站处理器。

ByteToMessageDecoder 的解码方法为 decode(),是一个抽象方 法。也就是说,对于decode() 方法的具体解码过程, ByteToMessageDecoder 没有具体的实现,如何将 ByteBuf 中的字节数据 变成什么样的 Object 实例(包含多少个Object实例)需要子类去完成。所以,作为解码器的父类,ByteToMessageDecoder 仅仅提供了一 个整体框架:它会调用子类的 decode() 方法,完成具体的二进制字节解码,然后会获取子类解码之后的 Object 结果,放入自己内部的结果列表List中,最终父类负责将List中的元素一个一个地传递给下一站。从这个角度来说,ByteToMessageDecoder 在设计上使用了模板模式。

ByteToMessageDecoder 的子类要做的是将从入站 ByteBuf解码出来的所有 Object 实例加入父类的 List 列表中。实现一个解码器,首先要继承 ByteToMessageDecoder 抽象类,然后实现其基类的decode() 抽象方法。总体来说,流程大致如下:

  • 继承 ByteToMessageDecoder 抽象类。
  • 实现基类的 decode() 抽象方法,将 ByteBuf 到目标 POJO 的解码逻辑写入此方法,以将 ByteBuf 中的二进制数据解码成一个一个的 POJO 对象。
  • 解码完成后,需要将解码后的 POJO 对象放入decode 方法的 List 实参中,此实参是父类所传入的解码结果收集容器。
  • 余下的工作都由父类 ByteToMessageDecoder 自动完成。在流水线的处理过程中,父类在执行完子类的解码后,会将 List 收集到的结果一个一个地传递到下一个 Inbound 入站处理器。


自定义整数解码器

下面是一个小小的 ByteToMessageDecoder子类的实战案例:整数 解码器。其功能是将ByteBuf中的字节解码成整数类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Byte2IntegerDecoder extends ByteToMessageDecoder {
/**
* 1. 在decode()方法中,通过ByteBuf的readInt()实例方法从输入缓冲区读取整数,其作用是将二进制数据解码成一个一个的整数。
* 2. 将解码后的整数增加到 decode() 方法的 List<Object> 列表参数中。
* 3. decode()方法不断地循环解码,并且不断地添加到 List<Object> 结果容器中。
* 4. decode()方法处理完成后,基类会继续后面的传递处理:将List<Object>结果列表中所得到的整数一个一个地传递到下一个Inbound入站处理器。
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 4) { // 进行长度判断
int i = in.readInt();
System.out.println("解码出一个整数:" + i);
out.add(i);
}
}
}

如何使用这个自定义的Byte2IntegerDecoder解码器呢?首先,需要将其加入通道流水线中;其次,由于解码器的功能仅仅是完成 ByteBuf的解码,不做其他业务处理,所以还需要编写一个业务处理器,用于在读取解码后的 Java POJO 对象之后完成具体的业务处理。 这里编写一个简单的配套处理器 IntegerProcessHandler,用于处理Byte2IntegerDecoder 解码之后的整数。其功能是:读取上一站的入站数据,把它转换成整数,并且输出到控制台上。配套处理器的代码如下:

1
2
3
4
5
6
7
public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Integer integer = (Integer) msg;
System.out.println("打印出一个整数: " + integer);
}
}

测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Byte2IntegerDecoderTest {
@Test
public void testByteToIntegerDecoder() {
ChannelInitializer<EmbeddedChannel> channelInit = new ChannelInitializer<>() {
protected void initChannel(EmbeddedChannel ch) {
// 这里请注意先后次序:Byte2IntegerDecoder解码器在前,
// IntegerProcessHandler处理器在后。因为入站处理的次序为从前到后。
ch.pipeline().addLast(new Byte2IntegerDecoder());
ch.pipeline().addLast(new IntegerProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(channelInit);
for (int i = 0; i < 3; i++) {
ByteBuf buf = Unpooled.buffer();
buf.writeInt(i);
channel.writeInbound(buf);
}
}
}
1
2
3
4
5
6
解码出一个整数:0
打印出一个整数: 0
解码出一个整数:1
打印出一个整数: 1
解码出一个整数:2
打印出一个整数: 2

最后说明一下:ByteToMessageDecoder 传递给下一站的是解码之后的 Java POJO 对象,不是 ByteBuf 缓冲区。那么问题来了,ByteBuf 缓冲区并没有发送到流水线的 TailContext(尾部处理器),将由谁负责释放引用计数呢?其实,基类 ByteToMessageDecoder 会完成 ByteBuf 释放工作,它会调用 ReferenceCountUtil.release(in)方法将之前的 ByteBuf 缓冲区的引用计数减1。这个 ByteBuf 先被释放了,如果在后面还需要用到,怎么办?可以 在子类的 decode() 方法中调用一次 ReferenceCountUtil.retain(in )来增加一次引用计数,不过在使用完成后要及时将增加的这次计数减去。


解码基类 ReplayingDecoder

判断功能

使用上面的 Byte2IntegerDecoder 整数解码器会面临一个问题:需要对 ByteBuf 的长度进行检查,有足够的字节才能进行整数的读取。这种长度的判断是否可以由 Netty 来帮忙完成呢?答案是可以的,可以使用 Netty 的ReplayingDecoder 类省去长度的判断。ReplayingDecoder 类是 ByteToMessageDecoder 的子类,作用是:

  • 在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。
  • 若ByteBuf中有足够的字节,则会正常读取;反之,则会停止解码。
1
2
3
4
5
6
7
8
public class Byte2IntegerReplayDecoder extends ReplayingDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
int i = in.readInt();
System.out.println("解码出一个整数: " + i);
out.add(i);
}
}

继承 ReplayingDecoder 实现一个解码器,就不用编写长度判断的代码。ReplayingDecoder 进行长度判断的原理很简单:内部定义一个新的二进制缓冲区类(类名为 ReplayingDecoderBuffer),又对 ByteBuf 缓冲区进行装饰。该装饰器的特点是,在缓冲区真正读数据之前先进行长度的判断:如果长度合格,就读取数据;否则就抛出ReplayError。ReplayingDecoder 捕获到 ReplayError 后会留着数据,等待下一次IO事件到来时再读取。

简单来讲,ReplayingDecoder 对输入的 ByteBuf 进行了 “偷梁换柱”,在将外部传入的 ByteBuf 缓冲区传给子类之前,换成了自己装饰过的 ReplayingDecoderBuffer 缓冲区。也就是说,在示例程序中, Byte2IntegerReplayDecoder 中的 decode() 方法所得到的实参in的直接类型并不是原始的 ByteBuf 类型,而是ReplayingDecoderBuffer 类型。

ReplayingDecoderBuffer 类型首先是一个内部类,其次是继承了 ByteBuf 类型,包装了 ByteBuf 类型的大部分读取方法。 ReplayingDecoderBuffer 对 ByteBuf 主要是进行二进制数据长度的判断,如果长度不足,就抛出异常。这个异常会反过来被 ReplayingDecoder 基类所捕获,将解码工作停掉。实质上,ReplayingDecoder 的作用远远不止于进行长度判断,它更重要的作用是用于分包传输的应用场景。


分包解码

我们知道,底层通信协议是分包传输的,一份数据可能分几个数据包到达对端。发送端出去的包在传输过程中会进行多次拆分和组装。接收端收到的包和发送端所发送的包不是一模一样的:在发送端发出4个字符串,Netty 或者 NIO 接收端可能只接收到了3 个ByteBuf 数据缓冲。 在 Java OIO 流式传输中,程序若读不到完整的信息则会一直阻塞,而不会继续执行。在Java的 NIO(具有非阻塞性)中,保证一次性读取到完整的数据则成了一个大问题。

那么,Netty通过什么样的解码器对图中接收端的3个ByteBuf 缓冲数据进行解码,而后得到和发送端一模一样的4个字符串呢?理论 上可以使用 ReplayingDecoder 来解决。在进行数据解析时,如果发现当前ByteBuf中所有可读的数据不够,那么ReplayingDecoder会一直等待,直到可读数据是足够的。这一切都是在ReplayingDecoder内部, 通过与缓冲区装饰器ReplayingDecoderBuffer相互配合完成的。所以,图中展示的字符串错乱问题完全可以通过继承 ReplayingDecoder 基类实现自己的解码器来解决。

图中的问题是字符串传输过程中出现的,并且实现字符串的解码和纠正相对比较复杂。为了好懂,这里先介绍一个简单点的例子 ——整数序列解码,并且将它们两两一组进行相加,重点是,解码过程中需要保持发送时的次序。

要完成上述例子,需要用到ReplayingDecoder的一个很重要的属性——state成员属性。该成员属性的作用是保存当前解码器在解码过程中所处的阶段。在Netty源代码中,该属性的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
// ...

// 缓冲区装饰器
private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();
// 重要的成员属性,表示解码过程中所处的阶段,类型为泛型,默认为Object
private S state;
// 默认的构造器,state值为空,没有用到该属性
protected ReplayingDecoder() {
this((Object)null);
}
// 重载的构造器
protected ReplayingDecoder(S initialState) {
// 初始化内部的ByteBuf缓冲区装饰器类
this.replayable = new ReplayingDecoderByteBuf();
// 读指针检查点,默认为-1
this.checkpoint = -1;
// 状态state的默认值为null
this.state = initialState;
}

// ...
}


整数的分包解码

下面先基于ReplayingDecoder基础解码器编写一个整数相加的解码器:解码两个整数,并把这两个数据之和作为解码的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;

/**
* 完成两个整数的提取并求和的过程可以从业务上分成两个阶段。使用state属性来保存目前所处的阶段:
* 如果是第一个阶段,则仅仅提取第一个整数,完成后进入第二个阶段;
* 如果是第二个阶段,则不仅要提取第二个整数,提取后还需要计算相加的结果,并将相加的和作为解码
* 结果输出。只有两个阶段全部完成才表示一次解码工作完成。
*/
public class IntegerAddDecoder extends ReplayingDecoder<IntegerAddDecoder.PHASE> {
enum PHASE {
PHASE_1, // 第一个阶段,仅仅提取第一个整数,完成后进入第二个阶段
PHASE_2 // 第二个阶段,提取第二个整数后,还需要计算相加的结果并输出
}

private int first;
private int second;

public IntegerAddDecoder() {
super(PHASE.PHASE_1); // 在构造函数中,初始化父类的state属性为 PHASE_1,表示第一个阶段
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) { // 判断当前的状态
// 第一个阶段,仅仅提取第一个整数,完成后进入第二个阶段
case PHASE_1:
// 从装饰器 ByteBuf 中读取数据
first = in.readInt();
// 第一步解析成功,进入第二步,设置 state 为第二个阶段
checkpoint(PHASE.PHASE_2);
break;
// 提取第二个整数后还需要计算相加的结果,并将和作为解码的结果输出
case PHASE_2:
second = in.readInt();
Integer sum = first + second;
out.add(sum);
// 进入下一轮解码的第一步,设置 state 为第一个阶段
checkpoint(PHASE.PHASE_1);
break;
default:
break;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
public void testIntegerAddDecoder() {
ChannelInitializer<EmbeddedChannel> channelInit = new ChannelInitializer<>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new IntegerAddDecoder());
ch.pipeline().addLast(new IntegerProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(channelInit);
for (int i = 0; i < 4; i++) {
ByteBuf buf = Unpooled.buffer();
buf.writeInt(i);
channel.writeInbound(buf);
}
}
1
2
3
4
5
6
7
8
PHASE_1:解码出一个整数 0
PHASE_2:解码出一个整数 1
PHASE_2:sum 1
打印出一个整数: 1
PHASE_1:解码出一个整数 2
PHASE_2:解码出一个整数 3
PHASE_2:sum 5
打印出一个整数: 5


字符串的分包解码

在原理上,字符串分包解码和整数分包解码是一样的,所不同的是:整数的长度是固定的,目前在Java中是4字节;字符串的长度是不固定的,是可变的。如何获取字符串的长度信息呢?这是一个小小的难题,和程序所使用的具体传输协议是强相关的。一般来说,在Netty中进行字符串的传输可以采用普通的Head-Content内容传输协议。该协议的规则很简单:

  • 在协议的Head部分放置字符串的字节长度,可以用一个整数类型来描述。
  • 在协议的Content部分,放置字符串的字节数组。

在实际的传输过程中,一个Head-Content内容包在发送端会被编码成一个ByteBuf内容发送包,当到达接收端后可能被分成很多 ByteBuf 接收包。对于这些参差不齐的接收包,如何解码成最初的 ByteBuf 内容发送包来获得Head-Content内容呢?采用 ReplayingDecoder 解码器即可解决。

下面就是基于 ReplayingDecoder 实现自定义的字符串分包解码器的示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;

public class StringReplayDecoder extends ReplayingDecoder<StringReplayDecoder.PHASE> {
// 设置最大帧长度,防止由于错误数据导致申请巨大内存
private static final int MAX_LENGTH = 1024 * 1024;

enum PHASE {
PHASE_1, //第一个阶段:解码出字符串的长度
PHASE_2 //第二个阶段:按照第一个阶段的字符串长度解码出字符串的内容
}

private int length;
private byte[] inBytes;
public StringReplayDecoder() {
super(PHASE.PHASE_1); // 在构造函数中,需要初始化父类的state属性为PHASE_1阶段
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case PHASE_1:
// 第一步,从装饰器ByteBuf中读取字符串的长度
length = in.readInt();

// 防御式编程:校验长度合法性
if (length < 0 || length > MAX_LENGTH) {
throw new DecoderException("非法的长度字段: " + length);
}

inBytes = new byte[length];
// 进入第二步,读取内容,并设置 “读指针检查点” 为当前的 readerIndex 位置
checkpoint(PHASE.PHASE_2);
break;
case PHASE_2:
// 第二步,从装饰器ByteBuf 中读取字符串的内容数组
in.readBytes(inBytes, 0, length);
out.add(new String(inBytes, "UTF-8"));
// 第二步解析成功,进入下一个字符串的解析,并设置 “读指针检查点” 为当前的 readerIndex 位置
checkpoint(PHASE.PHASE_1);
break;
default:
break;
}
}
}
1
2
3
4
5
6
7
public class StringProcessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String s = (String) msg;
System.out.println("打印出一个字符串: " + s);
}
}

测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class StringReplayDecoderTest {
static String content = "密涅瓦的猫头鹰在黄昏起飞。";

@Test
public void testStringReplayDecoder() {
ChannelInitializer<EmbeddedChannel> channelInit = new ChannelInitializer<>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new StringReplayDecoder());
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(channelInit);
// 待发送字符串content的字节数组
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
// 循环发送100轮,每一轮可以理解为发送一个Head-Content报文
for (int j = 0; j < 5; j++) {//发送100个包
// 每个包为随机 1~3个 "密涅瓦的猫头鹰在黄昏起飞"
int random = new Random().nextInt(3) + 1;
ByteBuf buf = Unpooled.buffer();
// 发送长度:字节数组长度*重复次数
buf.writeInt(bytes.length * random);
// 重复拷贝content的字节数据到发送缓冲区
for (int k = 0; k < random; k++) {
buf.writeBytes(bytes);
}
// 发送内容:发送buf缓冲区
channel.writeInbound(buf);
}
}
}
1
2
3
4
5
打印出一个字符串: 密涅瓦的猫头鹰在黄昏起飞。密涅瓦的猫头鹰在黄昏起飞。
打印出一个字符串: 密涅瓦的猫头鹰在黄昏起飞。
打印出一个字符串: 密涅瓦的猫头鹰在黄昏起飞。密涅瓦的猫头鹰在黄昏起飞。密涅瓦的猫头鹰在黄昏起飞。
打印出一个字符串: 密涅瓦的猫头鹰在黄昏起飞。密涅瓦的猫头鹰在黄昏起飞。
打印出一个字符串: 密涅瓦的猫头鹰在黄昏起飞。密涅瓦的猫头鹰在黄昏起飞。


更常用的分包解码

通过 ReplayingDecoder 解码器,可以正确地解码分包后的 ByteBuf 数据包。但是,在实际开发中不建议继承这个类,原因如下:

  • 不是所有的 ByteBuf 操作都被 ReplayingDecoderBuffer 装饰器类支持,可能有些 ByteBuf 方法在ReplayingDecoder 的 decode() 方法中会抛出 ReplayError 异常。
  • 在数据解码逻辑复杂的应用场景下,ReplayingDecoder 在解码速度上相对较差。因为在ByteBuf长度不够时,ReplayingDecoder会捕获一个ReplayError异常,并会把ByteBuf中的读指针还原到之前的读指针检查点(checkpoint),然后结束这次解析操作,等待下一次 IO 读事件。在网络条件比较糟糕时,一个数据包的解析逻辑会被反复执行多次,此时解析过程是一个消耗 CPU 的操作,解码速度上相对较差。所以,ReplayingDecoder 更多地应用于数据解析逻辑简单的场景。
  • 在数据解析复杂的应用场景下,建议使用前文介绍的解码器 ByteToMessageDecoder 或者其子类。

这里继承 ByteToMessageDecoder 基类,实现一个定制的 Head-Content 协议字符串内容解码器,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* 在上面的示例程序中,在读取数据之前,需要调用 buf.markReaderIndex() 方法标记当前的位置指针,
* 当可读内容不够(buf.readableBytes() < length)时,需要调用 buf.resetReaderIndex() 方法
* 将 readerIndex 读指针恢复到标记位置。
*
* 表面上ByteToMessageDecoder基类是无状态的,不像 ReplayingDecoder 那样需要使用状态位来保存当前的读取阶段,
* 实际上 ByteToMessageDecoder 也是有状态的。其内部有一个二进制字节累积器 cumulation,用来保存没有解析完的
* 二进制内容。所以 ByteToMessageDecoder 及其子类都是有状态的,其实例不能在通道之间共享。在每次初始化通道的
* 流水线时,都要重新创建一个 ByteToMessageDecoder或者它的子类的实例。
*/
public class StringIntegerHeaderDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> out) {
// 可读字节小于4,消息头还没读满,返回
if (buf.readableBytes() < 4) {
return;
}
// 消息头已经完整。在真正开始从缓冲区读取数据之前,调用 markReaderIndex() 设置 mark
buf.markReaderIndex();
int length = buf.readInt();
// 从缓冲区读出消息头的大小,这会导致 readIndex 读指针变化
// 如果剩余长度不够消息体的大小,则需要重置读指针,下一次从相同的位置处理
// 网络编程中处理半包(拆包)问题的核心逻辑。
// 这里的判断缓冲区里可读字节数居然比 length 还小,这意味着字符串还没传完,是个半包!
if (buf.readableBytes() < length) {
// 【关键动作】把磁头拨回到刚才 markReaderIndex() 标记的地方(即长度字段之前)。
// 因为我们这次“尝试读取”失败了,我们需要把已经读出来的 4 字节长度“吐回去”,假装我们从来没读过这个包。
buf.resetReaderIndex();
// 退出本次 decode 处理。由于我们把指针重置了,当后续更多的数据到达 ByteBuf 时,Netty 会再次调用 decode,
// 那时我们会重新从“长度字段”开始读,直到内容足够长,能凑齐一个完整的包。
return;
}
// 读取数据,编码成字符串
byte[] inBytes = new byte[length];
buf.readBytes(inBytes, 0, length);
out.add(new String(inBytes, StandardCharsets.UTF_8));
}
}


MessageToMessageDecoder

前面的解码器都是将 ByteBuf 缓冲区中的二进制数据解码成 Java的 普通 POJO 对象,那么是否存在一些解码器可以将一种POJO对象解码成另外一种POJO对象呢?答案是存在。与前面不同的是,解码器需要继承一个新的Netty解码器基类MessageToMessageDecoder。在继承它的时候,需要明确的泛型实参,用于指定入站消息的 Java POJO 类型。

MessageToMessageDecoder 同样使用了模板模式,也有一个 decode() 抽象方法,其具体解码的逻辑需要子类去实现。下面通过实现一个整数到字符串转换的解码器演示一下 MessageToMessageDecoder 的使用。代码很简单,如下所示:

1
2
3
4
5
6
public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) {
out.add(String.valueOf(msg));
}
}


常用的内置 Decoder

Netty提供了不少开箱即用的Decoder(解码器),能够满足很多编解码应用场景的需求。下面将几个比较基础的解码器梳理一下。


FixedLengthFrameDecoder

适用场景:每个接收到的数据包(在Netty中是一个ByteBuf实例)的长度都是固定的,例如100字节。在这种场景下,把 FixedLengthFrameDecoder 解码器加到流水线中,它就会把入站ByteBuf数据包拆分成一个个长度为100的数据包, 然后发往下一个channelHandler入站处理器。


LineBasedFrameDecoder

适用场景:每个ByteBuf数据包使用换行符(或者回车换行符)作为边界分隔符。在这种场景下,把LineBasedFrameDecoder解码器加到流水线中,Netty就会使用换行分隔符把ByteBuf数据包分割成一个一个完整的应用层ByteBuf数据包再发送到下一站。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DemoTest {
static String spliter = "\r\n";
static String content = "密涅瓦的猫头鹰在黄昏起飞。";

@Test
public void testLineBasedFrameDecoder() throws UnsupportedEncodingException {
ChannelInitializer<EmbeddedChannel> i = new ChannelInitializer<>() {
protected void initChannel(EmbeddedChannel ch) {
// 支持配置一个最大长度值,表示解码出来 的ByteBuf能包含的最大字节数。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常。
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(i);
for (int j = 0; j < 5; j++) { //发送5个包
int random = new Random().nextInt(3) + 1;
ByteBuf buf = Unpooled.buffer();
for (int k = 0; k < random; k++) {
buf.writeBytes(content.getBytes(StandardCharsets.UTF_8));
}
//发送"\r\n"回车换行符作为包结束符
buf.writeBytes(spliter.getBytes(StandardCharsets.UTF_8));
channel.writeInbound(buf);
}
}
}


DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder是LineBasedFrameDecoder按照行分割的通用版本,不同之处在于这个解码器更加灵活,可以自定义分隔符,而不是局限于换行符。如果使用这个解码器,那么所接收到的数据包末尾必须带上对应的分隔符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class NettyOpenBoxDecoder {
static String spliter2 = "\t";
static String content = "密涅瓦的猫头鹰在黄昏起飞。";
/**
* LengthFieldBasedFrameDecoder使用实例
*/
@Test
public void testDelimiterBasedFrameDecoder() {
try {
final ByteBuf delimiter = Unpooled.copiedBuffer(spliter2.getBytes(StandardCharsets.UTF_8));
ChannelInitializer channelInit = new
ChannelInitializer<EmbeddedChannel>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, true, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringProcessHandler());
}
};
// 省略与前一个实例重复的代码
}
}
}


LengthFieldBasedFrameDecoder

这是一种基于灵活长度的解码器,在ByteBuf数据包中加了一个长度字段,保存了原始数据包的长度,解码时会按照原始数据包长度进行提取。此解码器在所有开箱即用解码器中是最为复杂的一种,同时也比较常用。

LengthFieldBasedFrameDecoder 可以翻译为 “长度字段数据包解码器”。传输内容中的 Length(长度)字段的值是指存放在数据包中要传输内容的字节数。普通的基于 Head-Content 协议的内容传输尽量用内置的 LengthFieldBasedFrameDecoder 来解码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NettyOpenBoxDecoder {
public static final int VERSION = 100;
public static final int MAGICCODE = 234;
static String content = "密涅瓦的猫头鹰在黄昏起飞。";

@Test
public void testLengthFieldBasedFrameDecoder3() {
try {
/**
* maxFrameLength:发送的数据包的最大长度
* lengthFieldOffset:长度字段偏移量。指的是长度字段位于整个数据包内部字节数组中的下标索引值。
* lengthFieldLength:长度字段本身占用的字节数。如果长度字段是一个int整数,则为4;如果长度字段是一个short整数,则为2。
* lengthAdjustment:长度字段的偏移量矫正
* initialBytesToStrip:丢弃的起始字节数
*
* 第1个参数maxFrameLength可以设置为1024,表示数据包的最大长度为1024字节。
* 第2个参数lengthFieldOffset可以设置为2,表示长度字段处于版本号的后面。
* 第3个参数lengthFieldLength可以设置为4,表示长度字段为4字节。
* 第4个参数lengthAdjustment可以设置为4。长度调整值的计算方法为:内容字段偏移量-长度字段偏移量-长度字段的长度=10-2-4=4。
* 在这个例子中,lengthAdjustment就是夹在内容字段和长度字段中的部分——魔数字段的长度。
* 第5个参数initialBytesToStrip可以设置为10,表示获取最终Content内容的字节数组时抛弃最前面的10字节数据。换句话说,长度字段、版本字段、魔数字段的值被抛弃。
*/
final LengthFieldBasedFrameDecoder spliter =
new LengthFieldBasedFrameDecoder(1024, 2, 4, 4, 10);
ChannelInitializer<EmbeddedChannel> channelInit = new ChannelInitializer<>() {
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(spliter);
ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8));
ch.pipeline().addLast(new StringProcessHandler());
}
};
EmbeddedChannel channel = new EmbeddedChannel(channelInit);
for (int i = 1; i <= 100; i++) {
ByteBuf buf = Unpooled.buffer();
String s = i + "次发送->" + content;
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
buf.writeChar(VERSION); // 2
buf.writeInt(bytes.length); // 4
buf.writeInt(MAGICCODE); // 4
buf.writeBytes(bytes); // content
channel.writeInbound(buf);
}
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


编码 MessageToByteEncoder

在Netty的业务处理完成后,业务处理的结果往往是某个Java POJO对象需要编码成最终的ByteBuf二进制类型,通过流水线写入底层 的Java通道,这就需要用到Encoder(编码器)。 在Netty中,什么叫编码器?首先,编码器是一个Outbound出站处理器,负责处理“出站”数据;其次,编码器将上一站Outbound出站处理器传过来的输入(Input)数据进行编码或者格式转换,然后传递到下一站ChannelOutboundHandler出站处理器。

编码器与解码器相呼应,Netty中的编码器负责将“出站”的某种 Java POJO对象编码成二进制ByteBuf,或者转换成另一种Java POJO对象。 编码器是ChannelOutboundHandler的具体实现类。一个编码器将出站对象编码之后,数据将被传递到下一个ChannelOutboundHandler 出站处理器进行后面的出站处理。 由于最后只有ByteBuf才能写入通道中,因此可以肯定通道流水线上装配的第一个编码器一定是把数据编码成了 ByteBuf 类型。为什么编码成的最终ByteBuf类型数据包的编码器是在流水线的头部,而不是在流水线的尾部呢?原因很简单:出站处理的顺序是从后向前的。

MessageToByteEncoder 是一个抽象类,仅仅实现了编码的基础流程,在编码过程中通过调用encode()抽象方法 来完成。它的encode()编码方法是一个抽象方法,没有具体的编码逻辑实现,实现encode()抽象方法的工作需要子类去完成。

1
2
3
4
5
6
7
public class Integer2ByteEncoder extends MessageToByteEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out){
out.writeInt(msg);
System.out.println("encoder Integer = " + msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Integer2ByteEncoderTest {
@Test
public void testIntegerToByteDecoder() {
ChannelInitializer<EmbeddedChannel> channelInit = new ChannelInitializer<>() {
@Override
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(new Integer2ByteEncoder());
}
};
EmbeddedChannel channel = new EmbeddedChannel(channelInit);
for (int j = 0; j < 5; j++) {
channel.write(j); // 向通道写入整数
}
channel.flush();
// 取得通道的出站数据包
ByteBuf buf = channel.readOutbound();
while (null != buf) {
System.out.println("o = " + buf.readInt());
buf = channel.readOutbound();
}
}
}
1
2
3
4
5
6
7
8
9
10
encoder Integer = 0
encoder Integer = 1
encoder Integer = 2
encoder Integer = 3
encoder Integer = 4
o = 0
o = 1
o = 2
o = 3
o = 4


ByteToMessageCodec 编解码

前面讲到解码器和编码器是分开实现的。例如,通过继承 ByteToMessageDecoder 基类或者其子类,完成ByteBuf 数据包到 POJO 的解码工作;通过继承基类 MessageToByteEncoder 或者其子类,完成 POJO 到 ByteBuf 数据包的编码工作。总之,具有相反逻辑的编码器和解码器分开实现在两个不同的类中,导致的一个结果是相互配套的编码器和解码器在加入通道的流水线时常常需要分两次添加。现在的问题是:具有相互配套逻辑的编码器和解码器能否放在同一个类中呢?答案是肯定的,这需要用到Netty的新类型—— Codec(编解码器)。

完成POJO到ByteBuf数据包的编解码器基类为 ByteToMessageCodec,它是一个抽象类。从功能上说,继承 ByteToMessageCodec 就等同于继承了 ByteToMessageDecoder 和 MessageToByteEncoder 这两个基类。编解码器ByteToMessageCodec同时包含了编码encode()和解码 decode()两个抽象方法,这两个方法都需要我们自己实现:

  • 编码方法——encode(ChannelHandlerContext, I, ByteBuf)。
  • 解码方法——decode(ChannelHandlerContext, ByteBuf, List)。

下面是一个整数到字节、字节到整数的编解码器,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Byte2IntegerCodec extends ByteToMessageCodec<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {
out.writeInt(msg);
System.out.println("write Integer = " + msg);
}

@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() >= 4) {
int i = in.readInt();
System.out.println("Decoder i= " + i);
out.add(i);
}
}
}

这是编码器和解码器的结合,简单地通过继承的方式将前面编码 器的encode()方法和解码器的decode()方法放在了同一个自定义类 中,这样在逻辑上更加紧密。在使用时,加入流水线时也只需要加入一次。从上面的示例程序可以看出,ByteToMessageCodec编解码器和前面的编码器与解码器分开来实现相比仅仅是少写了一个类,少加入了 一次流水线,在技术、功能上和分开实现、添加到流水线没有任何区别。

对于 POJO 之间进行转换的编码和解码,Netty 将 MessageToMessageEncoder 编码器和MessageToMessageDecoder 解码器进行了简单的整合,整合出一个新的编解码器基类—— MessageToMessageCodec。这个基类同时包含了 encode() 和 decode() 两个抽象方法,用于完成 POJO-TO-POJO 的双向转换,仅仅是使用形式变得简化了。


CombinedChannelDuplexHandler 组合器

前面的编码器和解码器相结合是通过继承完成的。继承的不足之处在于:将编码器和解码器的逻辑强制性地放在同一个类中,在只需要编码或者解码单边操作的流水线上,逻辑上不大合适。编码器和解码器如果要结合起来,除了继承的方法之外,还可以通过组合的方式实现。与继承相比,组合会带来更大的灵活性:编码器和解码器可以捆绑使用,也可以单独使用。

如何把单独实现的编码器和解码器组合起来呢? Netty 提供了一个新的组合器——CombinedChannelDuplexHandler 基类。其用法也很简单,下面通过示例程序来演示如何将前面的整数解码器IntegerFromByteDecoder 和对应的整数编码器 IntegerToByteEncoder 组合起来。代码如下:

1
2
3
4
5
public class IntegerDuplexHandler extends CombinedChannelDuplexHandler<Byte2IntegerDecoder, Integer2ByteEncoder> {
public IntegerDuplexHandler() {
super(new Byte2IntegerDecoder(), new Integer2ByteEncoder());
}
}

只需要继承 CombinedChannelDuplexHandler,而不需要像 ByteToMessageCodec 那样把编码逻辑和解码逻辑都挤在同一个类中, 还是复用原来分开的编码器和解码器实现代码。总之,使用CombinedChannelDuplexHandler 可以保证有了相反逻 辑关系的 encoder 编码器和 decoder 解码器既可以结合使用,又可以分开使用,十分方便。